package com.dft.gmall.realtime.common.util;

import com.dft.gmall.realtime.common.constant.Constant;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

/** @author shengxuan */
public class FlinkSourceUtil {
  public static KafkaSource<String> getKafkaSource(String groupId, String topic) {
    return KafkaSource.<String>builder()
        .setBootstrapServers(Constant.KAFKA_BROKERS)
        .setGroupId(groupId)
        .setTopics(topic)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(
            new DeserializationSchema<String>() {
              @Override
              public String deserialize(byte[] message) throws IOException {
                if (message != null) {
                  return new String(message, StandardCharsets.UTF_8);
                }
                return null;
              }

              @Override
              public boolean isEndOfStream(String nextElement) {
                return false;
              }

              @Override
              public TypeInformation<String> getProducedType() {
                return Types.STRING;
              }
            })
        .build();
  }

  public static MySqlSource<String> getMySqlSource(String databaseName, String tableName) {
    Properties props = new Properties();
    props.setProperty("useSSL", "false");
    props.setProperty("allowPublicKeyRetrieval", "true");

    return MySqlSource.<String>builder()
        .hostname(Constant.MYSQL_HOST)
        .port(Constant.MYSQL_PORT)
        .username(Constant.MYSQL_USER_NAME)
        .password(Constant.MYSQL_PASSWORD)
        .jdbcProperties(props)
        .databaseList(databaseName)
        .tableList(databaseName + "." + tableName)
        .deserializer(new JsonDebeziumDeserializationSchema())
        .startupOptions(StartupOptions.initial())
        .build();
  }
}
